
package com.shiku.imserver.service;


import com.shiku.commons.task.TimerTask;
import com.shiku.imserver.CoreService;
import com.shiku.imserver.common.IService;
import com.shiku.imserver.common.message.AbstractMessage;
import com.shiku.imserver.common.message.ChatMessage;
import com.shiku.imserver.common.packets.ImPacket;
import com.shiku.imserver.common.utils.StringUtils;
import com.shiku.imserver.message.MessageFactory;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;


public class ReceiptLogicService
        implements IService {
    private static Logger log = LoggerFactory.getLogger(CoreService.class);


    private static long ACKTIMEOUT = 15000L;

    private static long RESEND_COUNT = 5L;


    private static final String MESSAGE_MAP = "message_map";


    private static final String OUT_WAIT_TASK = "waitack_task";

    private ScheduledExecutorService ecutorScheduler;


    @Override
    public boolean initialize() {

        this.ecutorScheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

        return true;

    }


    public void willMessageReceipt(ChannelContext channelContext, ChatMessage message) {

        if (null == message) {

            return;

        }

        Map<String, Entry> messageMap = (Map<String, Entry>) channelContext.getAttribute("message_map");


        if (null == messageMap) {

            messageMap = new ConcurrentHashMap<>();

            SkAckTimeoutTask ackTimeoutTask = new SkAckTimeoutTask(channelContext);

            this.ecutorScheduler.scheduleAtFixedRate((Runnable) ackTimeoutTask, 10L, 10L, TimeUnit.SECONDS);

            channelContext.setAttribute("message_map", messageMap);

        }

        Entry entry = messageMap.get(message.getMessageHead().getMessageId());

        if (null == entry) {

            entry = new Entry(message);

            messageMap.put(message.getMessageHead().getMessageId(), entry);

        } else {

            entry.refreshStamp();

        }

    }


    public void removeMessage(ChannelContext channelContext, String messageId, String roomJid) {

        String[] messageIds = messageId.split(",");


        Map<String, Entry> messageMap = (Map<String, Entry>) channelContext.getAttribute("message_map");

        if (null == messageMap) {
            return;
        }

        for (String msgId : messageIds) {

            if (!StringUtils.isEmpty(msgId)) {

                messageMap.remove(msgId);

            }

        }

    }


    public void logout(ChannelContext channelContext) {

        SkAckTimeoutTask ackTimeTask = (SkAckTimeoutTask) channelContext.getAttribute("waitack_task");

        Map<String, Entry> messageMap = (Map<String, Entry>) channelContext.getAttribute("message_map");

        if (null != messageMap) {

            messageMap.values().forEach(entry -> {

                if (1 == entry.getMessage().getMessageHead().getChatType()) {
                    IMBeanUtils.getMessageRepository().storeChatOffMessage(entry.getMessage());
                }

            });

            messageMap.clear();

        }

        if (null != ackTimeTask) {

            ackTimeTask.cancel();

        }

    }


    @Override
    public boolean startupAfter() {

        return false;

    }


    private class SkAckTimeoutTask
            extends TimerTask {
        private ChannelContext channelContext;


        public SkAckTimeoutTask(ChannelContext channelContext) {

            this.channelContext = channelContext;

        }


        @Override
        public void run() {

            if (null == this.channelContext) {
                return;
            }

            Map<String, Entry> messageMap = (Map<String, Entry>) this.channelContext.getAttribute("message_map");

            if (null == messageMap) {
                return;
            }

            long cuTime = System.currentTimeMillis();

            Entry entry = null;

            ChatMessage message = null;

            Iterator<Entry> iterator = messageMap.values().iterator();

            String messageId = null;

            while (iterator.hasNext()) {

                entry = iterator.next();

                messageId = entry.getMessage().getMessageHead().getMessageId();

                message = entry.getMessage();

                if (cuTime - entry.getStamp() < ReceiptLogicService.ACKTIMEOUT) {

                    continue;

                }


                if (ReceiptLogicService.RESEND_COUNT > entry.getReCount()) {


                    messageMap.remove(messageId);

                    if (!this.channelContext.isClosed) {

                        ImPacket packet = MessageFactory.convertToImPacket((AbstractMessage) message);

                        packet.setMessage(message);

                        CoreService.send(this.channelContext, packet);


                        continue;

                    }


                    if (1 == message.getMessageHead().getChatType()) {
                        IMBeanUtils.getMessageRepository().storeChatOffMessage(message);
                    }

                    continue;

                }

                ReceiptLogicService.log.info(" {}  5次未收到回执  取消重发   》 {}", this.channelContext.userid, message
                        .getMessageHead().getMessageId());

                messageMap.remove(messageId);

                if (1 == message.getMessageHead().getChatType()) {

                    IMBeanUtils.getMessageRepository().storeChatOffMessage(message);

                }

            }

        }

    }


    private class Entry {
        private ChatMessage message;
        private long stamp = System.currentTimeMillis();

        private int reCount = 0;


        public Entry(ChatMessage message) {

            this.message = message;

        }


        public long getStamp() {

            return this.stamp;

        }


        public void refreshStamp() {

            this.stamp = System.currentTimeMillis();

        }


        public void addReCount() {

            this.reCount++;

            refreshStamp();

        }


        public int getReCount() {

            return this.reCount;

        }


        public ChatMessage getMessage() {

            return this.message;

        }

    }

}


